Pair RDD

Spark defines PairRDDFunctions class with several functions to work with Pair RDD or RDD key-value pair, In this tutorial, we will learn these functions with Scala examples. Pair RDD’s are come in handy when you need to apply transformations like hash partition, set operations, joins e.t.c.
  • Spark Paired RDDs are nothing but RDDs containing a key-value pair  that is, a two element tuple
  • The keys and values can be of any type.
  • The key is the identifier, whereas value is the data corresponding to the key value. 
  • pair RDDs contains few special operations in it. Such as, distributed “shuffle” operations, grouping or aggregating the elements by a key.
  • Pair RDDs are useful when implementing MapReduce algorithms. 
  • In spark paired RDDs reduceByKey() method aggregate data separately for each key and a join() method, which merges two RDDs together by grouping elements with the same key. 
Crate pairRDD
val lines=sc.textFile("/FileStore/tables/data.txt")
val pairs = lines.map(x => (x.split(" ")(0), x))
pairs.collect.foreach(println)
 
val data=sc.parallelize(List("ramesh","Adam"))
val pair=data.keyBy(_.charAt(0))
pair.collect.foreach(println)

val rdd=sc.textFile("/FileStore/tables/data.txt").flatMap(x => x.split(" ")).map(words => (words,words.length))
rdd.collect.foreach(println)


val rdd = sc.textFile("/Data/data.txt")
val pairRdd = rdd.map { x =>
      var str = x.split(",")
      (str(0), x)
      }
 pairRdd.foreach(x =>
      {
      println(x._1 + " " + x._2)
      })
pair.mapValues(x=>"Mr." +x).collect


Action on Pair RDD 
val rdd = sc.parallelize(Seq(("math",55),("math",56),("english", 57),("english", 58),("science", 59),("science", 54)))
val result1 = rdd.countByKey().foreach(println)


val rdd = sc.parallelize(Seq(("math",55),("math",56),("english", 57),("english", 58),("science", 59),("science", 54)))
val result2 = rdd.collectAsMap()



val result3 = rdd.lookup("math")

val rdd = sc.parallelize(List("hello","world","good","morning"))
val rdd1=rdd.map( x => (x.length,x))
rdd1.collect
rdd1.collect.foreach(println)


val rdd2=rdd1.groupByKey()
val rdd3=rdd1.reduceByKey((x,y)=>(x+y))
val rdd4=rdd3.map(t=>(t._1,t._2)).sortByKey(false).collect

val rdd1 = sc.parallelize(List((110, 50.35), (127, 305.2), (126, 211.0),(105, 6.0),(165, 31.0), (110, 40.11)))
val rdd2 = sc.parallelize(List((110, "a"), (127, "b"), (126, "b"),  (105, "a"),(165, "c")))
val join = rdd1.join(rdd2).collect

How to find max value pair in RDD?
val a = Array(("a",1), ("b",2), ("c",1), ("d",3))
val rdd = sc.parallelize(a)
val maxKey = rdd.takeOrdered(1)(Ordering[Int].reverse.on(_._2))
 

No comments:

Post a Comment